// optimize the physical plan chainedProgram.addLast( PHYSICAL, FlinkVolcanoProgramBuilder.newBuilder .add(FlinkStreamRuleSets.PHYSICAL_OPT_RULES) .setRequiredOutputTraits(Array(FlinkConventions.STREAM_PHYSICAL)) .build()) // physical rewrite chainedProgram.addLast( PHYSICAL_REWRITE, FlinkGroupProgramBuilder.newBuilder[StreamOptimizeContext] // add a HEP program for watermark transpose rules to make this optimization deterministic .addProgram( FlinkHepRuleSetProgramBuilder.newBuilder .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION) .setHepMatchOrder(HepMatchOrder.BOTTOM_UP) .add(FlinkStreamRuleSets.WATERMARK_TRANSPOSE_RULES) .build(), "watermark transpose") .addProgram(newFlinkChangelogModeInferenceProgram, "Changelog mode inference") .addProgram(newFlinkMiniBatchIntervalTraitInitProgram, "Initialization for mini-batch interval inference") .addProgram( FlinkHepRuleSetProgramBuilder.newBuilder .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE) .setHepMatchOrder(HepMatchOrder.TOP_DOWN) .add(FlinkStreamRuleSets.MINI_BATCH_RULES) .build(), "mini-batch interval rules") .addProgram( FlinkHepRuleSetProgramBuilder.newBuilder .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION) .setHepMatchOrder(HepMatchOrder.BOTTOM_UP) .add(FlinkStreamRuleSets.PHYSICAL_REWRITE) .build(), "physical rewrite") .build())
PhysicalNode -> ExecNode
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/** * Converts [[FlinkPhysicalRel]] DAG to [[ExecNode]] DAG, and tries to reuse duplicate sub-plans. */ @VisibleForTesting private[flink] deftranslateToExecNodePlan( optimizedRelNodes: Seq[RelNode]): util.List[ExecNode[_, _]] = { require(optimizedRelNodes.forall(_.isInstanceOf[FlinkPhysicalRel])) // Rewrite same rel object to different rel objects // in order to get the correct dag (dag reuse is based on object not digest) val shuttle = newSameRelObjectShuttle() val relsWithoutSameObj = optimizedRelNodes.map(_.accept(shuttle)) // reuse subplan val reusedPlan = SubplanReuser.reuseDuplicatedSubplan(relsWithoutSameObj, config) // convert FlinkPhysicalRel DAG to ExecNode DAG reusedPlan.map(_.asInstanceOf[ExecNode[_, _]]) }